# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
FFT iterface for fast Fourier Transforms using scipy fftpack backend (using scipy).
:class:`~hysop.numerics.ScipyFFT`
:class:`~hysop.numerics.ScipyFFTPlan`
"""
import numpy as np
import scipy as sp
from scipy import fftpack as _FFT
from hysop.tools.htypes import first_not_None
from hysop.numerics.fft.host_fft import HostFFTPlanI, HostFFTI, HostArray
from hysop.numerics.fft.fft import (
complex_to_float_dtype,
float_to_complex_dtype,
mk_shape,
mk_view,
)
[docs]
class ScipyFFTPlan(HostFFTPlanI):
"""
Wrap a scipy fftpack call (scipy.fftpack does not offer real planning capabilities).
"""
def __init__(self, fn, x, out, axis, scaling=None, **kwds):
super().__init__()
self.fn = fn
self.x = x
self.out = out
self.axis = axis
self.scaling = scaling
kwds = kwds.copy()
kwds["x"] = x
kwds["axis"] = axis
self.kwds = kwds
@property
def input_array(self):
return self.x
@property
def output_array(self):
return self.out
[docs]
def execute(self):
fn = self.fn
fn_kwds = self.kwds.copy()
if fn is _FFT.irfft:
assert "n" in fn_kwds
x = self.x
out = self.out
if isinstance(x, HostArray):
x = x.handle
if isinstance(out, HostArray):
out = out.handle
axis = self.axis
mkv = lambda *args, **kwds: mk_view(x.ndim, axis, *args, **kwds)
if fn is _FFT.irfft:
try:
x = x.view(dtype=complex_to_float_dtype(x.dtype))
except ValueError:
x = x.copy().view(dtype=complex_to_float_dtype(x.dtype))
is_even = fn_kwds["n"] % 2 == 0
rshape = list(x.shape)
rshape[axis] -= 1 + is_even
_in = np.empty(shape=rshape, dtype=x.dtype)
_in[mkv(1, None)] = x[mkv(2, x.shape[axis] - is_even)]
_in[mkv(0)] = x[mkv(0)]
fn_kwds["x"] = _in
# custom implementation of DCT-I and DST-I
# default scipy FFTPACK implementation has optimal complexity
# but O(sqrt(N)) error.
if fn in (_FFT.dct, _FFT.idct) and fn_kwds["type"] == 1:
N = x.shape[axis]
shape = x.shape
ndim = x.ndim
slc0 = mk_view(ndim, axis, 1, -1)
slc1 = mk_view(ndim, axis, None, None, -1)
slc2 = mk_view(ndim, axis, 2, N, None)
slc3 = mk_view(ndim, axis, 3, None, 2)
slc4 = mk_view(ndim, axis, None, N, None)
s0 = mk_shape(shape, axis, 2 * N - 2)
X = np.empty(shape=s0, dtype=x.dtype)
np.concatenate((x, x[slc0][slc1]), axis=axis, out=X)
res = _FFT.rfft(X, axis=axis)
res[slc2] = res[slc3]
res = res[slc4]
elif fn in (_FFT.dst, _FFT.idst) and fn_kwds["type"] == 1:
N = x.shape[axis]
shape = x.shape
ndim = x.ndim
slc0 = mk_view(ndim, axis, None, None, -1)
slc1 = mk_view(ndim, axis, 2, None, 2)
s0 = mk_shape(shape, axis, 2 * N + 2)
s1 = mk_shape(shape, axis, 1)
X = np.empty(shape=s0, dtype=x.dtype)
Z = np.zeros(shape=s1, dtype=x.dtype)
np.concatenate((Z, -x, Z, x[slc0]), axis=axis, out=X)
res = _FFT.rfft(X, axis=axis)[slc1]
else:
res = fn(**fn_kwds)
if fn is _FFT.rfft:
assert axis in (x.ndim - 1, -1)
is_even = x.shape[axis] % 2 == 0
rshape = list(res.shape)
rshape[axis] += 1 + is_even
if (out is None) or (not out.flags.c_contiguous):
real_output = out
out = np.empty(dtype=x.dtype, shape=rshape)
else:
rtype = complex_to_float_dtype(out.dtype)
real_output = None
out = out.view(dtype=rtype)
assert np.array_equal(out.shape, rshape)
ctype = float_to_complex_dtype(out.dtype)
out[mkv(1, out.shape[axis] - is_even)] = res
out[mkv(0)] = out[mkv(1)]
out[mkv(1)] = 0.0
if is_even:
out[mkv(-1)] = 0.0
out = out.view(dtype=ctype)
if real_output is not None:
real_output[...] = out
out = real_output
elif out is not None:
out[...] = res
else:
out = res
scaling = self.scaling
if scaling is not None:
out[...] *= scaling
return out
[docs]
class ScipyFFT(HostFFTI):
"""
Interface to compute local to process FFT-like transforms using the scipy fftpack backend.
Scipy fftpack backend has some disadvantages:
- creation of one or two intermediate temporary buffers at each call
- hermitian-complex data layout is different than all other fft implementations
- no planning capabilities (scipy.fftpack methods are just wrapped into fake plans)
It has native float and double precision support unlike the numpy fft backend.
Planning won't destroy original inputs.
"""
def __init__(self, backend=None, allocator=None, warn_on_allocation=True, **kwds):
super().__init__(
backend=backend,
allocator=allocator,
warn_on_allocation=warn_on_allocation,
**kwds,
)
self.supported_ftypes = (
np.float32,
np.float64,
)
self.supported_ctypes = (
np.complex64,
np.complex128,
)
[docs]
def fft(self, a, out=None, axis=-1, **kwds):
(shape, dtype) = super().fft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(fn=_FFT.fft, x=a, out=out, axis=axis, **kwds)
return plan
[docs]
def ifft(self, a, out=None, axis=-1, **kwds):
(shape, dtype, s) = super().ifft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(fn=_FFT.ifft, x=a, out=out, axis=axis, **kwds)
return plan
[docs]
def rfft(self, a, out=None, axis=-1, **kwds):
(shape, dtype) = super().rfft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(fn=_FFT.rfft, x=a, out=out, axis=axis, **kwds)
return plan
[docs]
def irfft(self, a, out=None, n=None, axis=-1, **kwds):
(shape, dtype, s) = super().irfft(a=a, out=out, n=n, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(
fn=_FFT.irfft, x=a, out=out, axis=axis, n=shape[axis], **kwds
)
return plan
[docs]
def dct(self, a, out=None, type=2, axis=-1, **kwds):
(shape, dtype) = super().dct(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(fn=_FFT.dct, x=a, out=out, axis=axis, type=type, **kwds)
return plan
[docs]
def idct(self, a, out=None, type=2, axis=-1, scaling=None, **kwds):
(shape, dtype, _, s) = super().idct(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(
fn=_FFT.idct,
x=a,
out=out,
axis=axis,
type=type,
scaling=first_not_None(scaling, 1.0 / s),
**kwds,
)
return plan
[docs]
def dst(self, a, out=None, type=2, axis=-1, **kwds):
(shape, dtype) = super().dst(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(fn=_FFT.dst, x=a, out=out, axis=axis, type=type, **kwds)
return plan
[docs]
def idst(self, a, out=None, type=2, axis=-1, scaling=None, **kwds):
(shape, dtype, _, s) = super().idst(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = ScipyFFTPlan(
fn=_FFT.idst,
x=a,
out=out,
axis=axis,
type=type,
scaling=first_not_None(scaling, 1.0 / s),
**kwds,
)
return plan